package com.uxsino.reactorq.commons;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.ActiveMQTopicSession;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *
 * a {@link Subscriber} sends message to JMS topic. see {@link TopicConnection}
 *
 * @param <T>
 */
public class TopicSubscriber<T> extends JMSSubscriber<T> {

    private static Logger logger = LoggerFactory.getLogger(TopicSubscriber.class);

    /**
     * create a topic subscriber
     * @param conn the connection opened for this subscriber
     * @param topic topic name
     * @param cls type of the object to be sent 
     * @throws JMSException
     */
    public TopicSubscriber(TopicConnection conn, String topic, Class<T> cls) throws JMSException {

        super(conn, topic, cls);
    }

    /**
     * create a topic subscriber
     * @param conn the connection opened for this subscriber
     * @param topic topic name
     * @param cls type of the object to be sent 
     * @throws JMSException
     */
    public TopicSubscriber(TopicConnection conn, Topic topic, Class<T> cls) throws JMSException {

        super(conn, topic, cls);
    }

    @Override
    protected Session openSession(Connection conn) throws JMSException {
        return ((TopicConnection) conn).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    }

    @Override
    protected Destination openDestination(Session session) throws JMSException {
        return ((TopicSession) session).createTopic(destinationName);
    }

    @Override
    protected MessageProducer openProducter(Session session) throws JMSException {
        MessageProducer producer = ((TopicSession) session).createPublisher((Topic) destination);
        producer.setTimeToLive(TIME_TO_LIVE);
        return producer;
    }

    public boolean checkSessionClosed() {
        if (null == this.session)
            return true;
        else if (this.session instanceof ActiveMQTopicSession) {
            ActiveMQTopicSession amts = (ActiveMQTopicSession) this.session;

            TopicSession s = (TopicSession) amts.getNext();
            if (null != s && s instanceof ActiveMQSession) {
                if (((ActiveMQSession) s).isClosed()) {
                    // 此处不能关闭整个连接，因为所有topic用的同一个连接
                    // super.dispose();
                    super.disposeSession();
                    logger.info("ActiveMQTopicSession is closed!");
                    return ((ActiveMQSession) s).isClosed();
                }
            }
        }
        return false;
    }
}
